//===--- TaskQueue.inc - Unix-specific TaskQueue ----------------*- C++ -*-===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

#include "polarphp/basic/TaskQueue.h"
#include "polarphp/basic/internal/_platform/TaskQueueImplUnix.h"
#include "polarphp/basic/Statistic.h"

#include "llvm/ADT/StringRef.h"
#include "llvm/ADT/DenseMap.h"
#include "llvm/ADT/DenseSet.h"
#include "llvm/Support/ErrorHandling.h"

#include <string>
#include <cerrno>

#ifdef HAVE_POSIX_SPAWN
#include <spawn.h>
#endif

#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

#if defined(HAVE_GETRUSAGE) && !defined(__HAIKU__)
#include <sys/resource.h>
#endif

#include <poll.h>
#include <sys/types.h>
#include <sys/wait.h>

#if !defined(__APPLE__)
extern char **environ;
#else
extern "C" {
// _NSGetEnviron is from crt_externs.h which is missing in the iOS SDK.
extern char ***_NSGetEnviron(void);
}
#endif

namespace polar {
namespace sys {

#if defined(HAVE_GETRUSAGE) && !defined(__HAIKU__)
TaskProcessInformation::TaskProcessInformation(ProcessId Pid, struct rusage Usage)
    : TaskProcessInformation(Pid,
        uint64_t(Usage.ru_utime.tv_sec) * 1000000 +
        uint64_t(Usage.ru_utime.tv_usec),
        uint64_t(Usage.ru_stime.tv_sec) * 1000000 +
        uint64_t(Usage.ru_stime.tv_usec),
        Usage.ru_maxrss) {
#ifndef __APPLE__
            // Apple systems report bytes; everything else appears to report KB.
            this->ProcessUsage.getValue().Maxrss <<= 10;
#endif // __APPLE__
        }
#endif // defined(HAVE_GETRUSAGE) && !defined(__HAIKU__)

bool Task::execute() {
   assert(State < TaskState::Executing && "This Task cannot be executed twice!");
   State = TaskState::Executing;

   // Construct argv.
   SmallVector<const char *, 128> Argv;
   Argv.push_back(ExecPath);
   Argv.append(Args.begin(), Args.end());
   Argv.push_back(0); // argv is expected to be null-terminated.

   // Set up the pipe.
   int FullPipe[2];
   pipe(FullPipe);
   Pipe = FullPipe[0];

   int FullErrorPipe[2];
   if (SeparateErrors) {
      pipe(FullErrorPipe);
      ErrorPipe = FullErrorPipe[0];
   }

   // Get the environment to pass down to the subtask.
   const char *const *envp = Env.empty() ? nullptr : Env.data();
   if (!envp) {
#if __APPLE__
      envp = *_NSGetEnviron();
#else
      envp = environ;
#endif
   }

   const char **argvp = Argv.data();

#ifdef HAVE_POSIX_SPAWN
   posix_spawn_file_actions_t FileActions;
  posix_spawn_file_actions_init(&FileActions);

  posix_spawn_file_actions_adddup2(&FileActions, FullPipe[1], STDOUT_FILENO);

  if (SeparateErrors) {
    posix_spawn_file_actions_adddup2(&FileActions, FullErrorPipe[1],
                                     STDERR_FILENO);
  } else {
    posix_spawn_file_actions_adddup2(&FileActions, STDOUT_FILENO,
                                     STDERR_FILENO);
  }

  posix_spawn_file_actions_addclose(&FileActions, FullPipe[0]);
  if (SeparateErrors) {
    posix_spawn_file_actions_addclose(&FileActions, FullErrorPipe[0]);
  }

  // Spawn the subtask.
  int spawnErr =
      posix_spawn(&Pid, ExecPath, &FileActions, nullptr,
                  const_cast<char **>(argvp), const_cast<char **>(envp));

  posix_spawn_file_actions_destroy(&FileActions);
  close(FullPipe[1]);
  if (SeparateErrors) {
    close(FullErrorPipe[1]);
  }

  if (spawnErr != 0 || Pid == 0) {
    close(FullPipe[0]);
    if (SeparateErrors) {
      close(FullErrorPipe[0]);
    }
    State = TaskState::Finished;
    return true;
  }
#else
   Pid = fork();
   switch (Pid) {
      case -1: {
         close(FullPipe[0]);
         if (SeparateErrors) {
            close(FullErrorPipe[0]);
         }
         State = TaskState::Finished;
         Pid = 0;
         break;
      }
      case 0: {
         // Child process: Execute the program.
         dup2(FullPipe[1], STDOUT_FILENO);
         if (SeparateErrors) {
            dup2(FullErrorPipe[1], STDERR_FILENO);
         } else {
            dup2(STDOUT_FILENO, STDERR_FILENO);
         }
         close(FullPipe[0]);
         if (SeparateErrors) {
            close(FullErrorPipe[0]);
         }
         execve(ExecPath, const_cast<char **>(argvp), const_cast<char **>(envp));

         // If the execve() failed, we should exit. Follow Unix protocol and
         // return 127 if the executable was not found, and 126 otherwise.
         // Use _exit rather than exit so that atexit functions and static
         // object destructors cloned from the parent process aren't
         // redundantly run, and so that any data buffered in stdio buffers
         // cloned from the parent aren't redundantly written out.
         _exit(errno == ENOENT ? 127 : 126);
      }
      default:
         // Parent process: Break out of the switch to do our processing.
         break;
   }

   close(FullPipe[1]);
   if (SeparateErrors) {
      close(FullErrorPipe[1]);
   }

   if (Pid == 0)
      return true;
#endif

   return false;
}

/// Read the data in \p Pipe, and append it to \p Output.
/// \p Pipe must be in blocking mode, and must contain unread data.
/// If \p UntilEnd is true, keep reading, and possibly blocking, till the pipe
/// is closed. If \p UntilEnd is false, just read once. Return true if error
static bool readFromAPipe(std::string &Output, int Pipe,
                          UnifiedStatsReporter *Stats, bool UntilEnd) {
   char outputBuffer[1024];
   ssize_t readBytes = 0;
   while ((readBytes = read(Pipe, outputBuffer, sizeof(outputBuffer))) != 0) {
      if (readBytes < 0) {
         if (errno == EINTR)
            // read() was interrupted, so try again.
            // Q: Why isn't there a counter to break out of this loop if there are
            //    more than some number of EINTRs?
            // A: EINTR on a blocking read means only one thing: the syscall was
            //    interrupted and the program should retry. So there is no need to
            //    stop retrying after any particular number of interruptions (any
            //    more than the program would stop reading after a particular number
            //    of bytes or whatever).
            continue;
         return true;
      }
      Output.append(outputBuffer, readBytes);
      if (Stats)
         Stats->getDriverCounters().NumDriverPipeReads++;
      if (!UntilEnd)
         break;
   }
   return false;
}

bool Task::readFromPipes(bool UntilEnd) {
   bool Ret = readFromAPipe(Output, Pipe, Stats, UntilEnd);
   if (SeparateErrors) {
      Ret |= readFromAPipe(Errors, ErrorPipe, Stats, UntilEnd);
   }
   return Ret;
}

void Task::finishExecution() {
   assert(State == TaskState::Executing &&
          "This Task must be executing to finish execution!");

   State = TaskState::Finished;

   // Read the output of the command, so we can use it later.
   readFromPipes(/*UntilEnd*/ false);

   close(Pipe);
   if (SeparateErrors) {
      close(ErrorPipe);
   }
}

bool TaskQueue::supportsBufferingOutput() {
   // The Unix implementation supports buffering output.
   return true;
}

bool TaskQueue::supportsParallelExecution() {
   // The Unix implementation supports parallel execution.
   return true;
}

unsigned TaskQueue::getNumberOfParallelTasks() const {
   // TODO: add support for choosing a better default value for
   // MaxNumberOfParallelTasks if NumberOfParallelTasks is 0. (Optimally, this
   // should choose a value > 1 tailored to the current system.)
   return NumberOfParallelTasks > 0 ? NumberOfParallelTasks : 1;
}

void TaskQueue::addTask(const char *ExecPath, ArrayRef<const char *> Args,
                        ArrayRef<const char *> Env, void *Context,
                        bool SeparateErrors) {
   std::unique_ptr<Task> T(
      new Task(ExecPath, Args, Env, Context, SeparateErrors, Stats));
   QueuedTasks.push(std::move(T));
}

/// Owns Tasks, handles correspondence between Tasks, file descriptors, and
/// process IDs.
/// FIXME: only handles stdout pipes, ignores stderr pipes.
class TaskMap {
   using PidToTaskMap = llvm::DenseMap<pid_t, std::unique_ptr<Task>>;
   PidToTaskMap TasksByPid;

public:
   TaskMap() = default;

   bool empty() const { return TasksByPid.empty(); }
   unsigned size() const { return TasksByPid.size(); }

   void add(std::unique_ptr<Task> T) { TasksByPid[T->getPid()] = std::move(T); }

   Task &findTaskForFd(const int fd) {
      auto predicate = [&fd](PidToTaskMap::value_type &value) -> bool {
         return value.second->getPipe() == fd;
      };
      auto iter = std::find_if(TasksByPid.begin(), TasksByPid.end(), predicate);
      assert(iter != TasksByPid.end() &&
             "All outstanding fds must be associated with a  Task");
      return *iter->second;
   }

   void destroyTask(Task &T) { TasksByPid.erase(T.getPid()); }
};

/// Concurrently execute the tasks in the TaskQueue, collecting the outputs from
/// each task.
/// Maintain invarients connecting tasks to execute, tasks currently executing,
/// and fds being polled. These invarients include:
/// A task is not in both TasksToBeExecuted and TasksBeingExecuted,
/// A task is executing iff it is in TasksBeingExecuted,
/// A task is executing iff any of its fds being polled are in FdsBeingPolled
/// (These should be all of its output fds, but today is only stdout.)
/// When a task has finished executing, wait for it to die, takes
/// action appropriate to the cause of death, then reclaim its
/// storage.
class TaskMonitor {
   std::queue<std::unique_ptr<Task>> &TasksToBeExecuted;
   TaskMap TasksBeingExecuted;

   std::vector<struct pollfd> FdsBeingPolled;

   const unsigned MaxNumberOfParallelTasks;

public:
   struct Callbacks {
      const TaskQueue::TaskBeganCallback TaskBegan;
      const TaskQueue::TaskFinishedCallback TaskFinished;
      const TaskQueue::TaskSignalledCallback TaskSignalled;
      const std::function<void()> PolledAnFd;
   };

private:
   Callbacks callbacks;

public:
   TaskMonitor(std::queue<std::unique_ptr<Task>> &TasksToBeExecuted,
               const unsigned NumberOfParallelTasks, const Callbacks &callbacks)
      : TasksToBeExecuted(TasksToBeExecuted),
        MaxNumberOfParallelTasks(
           NumberOfParallelTasks == 0 ? 1 : NumberOfParallelTasks),
        callbacks(callbacks) {}

   /// Run the tasks to be executed.
   /// \return true on error.
   bool executeTasks();

private:
   bool isFinishedExecutingTasks() const {
      return TasksBeingExecuted.empty() && TasksToBeExecuted.empty();
   }

   /// Start up tasks if we aren't already at the parallel limit, and no earlier
   /// subtasks have failed.
   /// \return true on error.
   bool startUpSomeTasks();

   /// \return true on error.
   bool beginExecutingATask(Task &T);

   /// Enter the task and its outputs in this TaskMonitor's data structures so
   /// it can be polled.
   void startPollingFdsOfTask(const Task &T);

   void stopPolling(ArrayRef<int> FinishedFds);

   enum class PollResult { HardError, SoftError, NoError };
   PollResult pollTheFds();

   /// \return None on error.
   Optional<std::vector<int>> readFromReadyFdsReturningFinishedOnes();

   /// Ensure that events bits returned from polling are what's expected.
   void verifyEvents(short events) const;

   void readDataIfAvailable(short events, int fd, Task &T) const;

   bool didTaskHangup(short events) const;
};

bool TaskMonitor::executeTasks() {
   while (!isFinishedExecutingTasks()) {
      if (startUpSomeTasks())
         return true;

      switch (pollTheFds()) {
         case PollResult::HardError:
            return true;
         case PollResult::SoftError:
            continue;
         case PollResult::NoError:
            break;
      }
      Optional<std::vector<int>> FinishedFds =
         readFromReadyFdsReturningFinishedOnes();
      if (!FinishedFds)
         return true;

      stopPolling(*FinishedFds);
   }
   return false;
}

bool TaskMonitor::startUpSomeTasks() {
   while (!TasksToBeExecuted.empty() &&
          TasksBeingExecuted.size() < MaxNumberOfParallelTasks) {
      std::unique_ptr<Task> T(TasksToBeExecuted.front().release());
      TasksToBeExecuted.pop();
      if (beginExecutingATask(*T))
         return true;
      startPollingFdsOfTask(*T);
      TasksBeingExecuted.add(std::move(T));
   }
   return false;
}

void TaskMonitor::startPollingFdsOfTask(const Task &T) {
   FdsBeingPolled.push_back({T.getPipe(), POLLIN | POLLPRI | POLLHUP, 0});
   // We should also poll T->getErrorPipe(), but this introduces timing
   // issues with shutting down the task after reading getPipe().
}

TaskMonitor::PollResult TaskMonitor::pollTheFds() {
   assert(!FdsBeingPolled.empty() &&
          "We should only call poll() if we have fds to watch!");
   int ReadyFdCount = poll(FdsBeingPolled.data(), FdsBeingPolled.size(), -1);
   if (callbacks.PolledAnFd)
      callbacks.PolledAnFd();
   if (ReadyFdCount != -1)
      return PollResult::NoError;
   return errno == EAGAIN || errno == EINTR ? PollResult::SoftError
                                            : PollResult::HardError;
}

bool TaskMonitor::beginExecutingATask(Task &T) {
   if (T.execute())
      return true;
   if (callbacks.TaskBegan)
      callbacks.TaskBegan(T.getPid(), T.getContext());
   return false;
}

static bool
cleanUpAHungUpTask(Task &T,
                   const TaskQueue::TaskFinishedCallback FinishedCallback,
                   TaskQueue::TaskSignalledCallback SignalledCallback);

/**
 Wait for the process with a given pid to finish.

 @param pidToWaitFor the pid of the process to wait for
 @return Status information of the wait call and information about process
 */
static std::pair<Optional<int>, TaskProcessInformation> waitForPid(const pid_t pidToWaitFor);
static bool
cleanUpAfterSignal(int Status, const Task &T, TaskProcessInformation ProcInfo,
                   const TaskQueue::TaskSignalledCallback SignalledCallback);
static bool
cleanUpAfterExit(int Status, const Task &T, TaskProcessInformation ProcInfo,
                 const TaskQueue::TaskFinishedCallback FinishedCallback);

Optional<std::vector<int>>
TaskMonitor::readFromReadyFdsReturningFinishedOnes() {
   std::vector<int> finishedFds;
   for (struct pollfd &fd : FdsBeingPolled) {
      const int fileDes = fd.fd;
      const short receivedEvents = fd.revents;
      fd.revents = 0;
      verifyEvents(receivedEvents);
      Task &T = TasksBeingExecuted.findTaskForFd(fileDes);
      readDataIfAvailable(receivedEvents, fileDes, T);
      if (!didTaskHangup(receivedEvents))
         continue;
      finishedFds.push_back(fileDes);
      const bool hadError =
         cleanUpAHungUpTask(T, callbacks.TaskFinished, callbacks.TaskSignalled);
      TasksBeingExecuted.destroyTask(T);
      if (hadError)
         return None;
   }
   return finishedFds;
}

void TaskMonitor::verifyEvents(const short events) const {
   // We passed an invalid fd; this should never happen,
   // since we always mark fds as finished after calling
   // Task::finishExecution() (which closes the Task's fd).
   assert((events & POLLNVAL) == 0 && "Asked poll() to watch a closed fd");
   const short expectedEvents = POLLIN | POLLPRI | POLLHUP | POLLERR;
   assert((events & ~expectedEvents) == 0 && "Received unexpected event");
   (void)expectedEvents;
}

void TaskMonitor::readDataIfAvailable(const short events, const int fd,
                                      Task &T) const {
   if (events & (POLLIN | POLLPRI)) {
      // There's data available to read. Read _some_ of it here, but not
      // necessarily _all_, since the pipe is in blocking mode and we might
      // have other input pending (or soon -- before this subprocess is done
      // writing) from other subprocesses.
      //
      // FIXME: longer term, this should probably either be restructured to
      // use O_NONBLOCK, or at very least poll the stderr file descriptor as
      // well; the whole loop here is a bit of a mess.
      T.readFromPipes(/*UntilEnd*/ false);
   }
}

bool TaskMonitor::didTaskHangup(const short events) const {
   return (events & (POLLHUP | POLLERR)) != 0;
}

static bool
cleanUpAHungUpTask(Task &T,
                   const TaskQueue::TaskFinishedCallback FinishedCallback,
                   const TaskQueue::TaskSignalledCallback SignalledCallback) {
   const auto StatusAndProcessInformation = waitForPid(T.getPid());
   if (!StatusAndProcessInformation.first)
      return true;

   T.finishExecution();
   int Status = *(StatusAndProcessInformation.first);
   TaskProcessInformation ProcInfo = StatusAndProcessInformation.second;
   return WIFEXITED(Status)
          ? cleanUpAfterExit(Status, T, ProcInfo, FinishedCallback)
          : WIFSIGNALED(Status)
            ? cleanUpAfterSignal(Status, T, ProcInfo, SignalledCallback)
            : false /* Can this case ever happen? */;
}

static std::pair<Optional<int>, TaskProcessInformation> waitForPid(const pid_t pidToWaitFor) {
   for (;;) {
      int Status = 0;

#if defined(HAVE_GETRUSAGE) && !defined(__HAIKU__) && defined(HAVE_WAIT4)
      struct rusage Usage;
    const pid_t pidFromWait = wait4(pidToWaitFor, &Status, 0, &Usage);
    TaskProcessInformation ProcInfo(pidToWaitFor, Usage);
#else
      const pid_t pidFromWait = waitpid(pidToWaitFor, &Status, 0);
      TaskProcessInformation ProcInfo(pidToWaitFor);
#endif

      if (pidFromWait == pidToWaitFor)
         return std::make_pair(Status, ProcInfo);
      assert(pidFromWait == -1 &&
             "Did not pass WNOHANG, should only get pidToWaitFor or -1");
      if (errno == ECHILD || errno == EINVAL)
         return std::make_pair(None, TaskProcessInformation(pidToWaitFor));
   }
}

static bool
cleanUpAfterExit(int Status, const Task &T, TaskProcessInformation ProcInfo,
                 const TaskQueue::TaskFinishedCallback FinishedCallback) {
   const int Result = WEXITSTATUS(Status);
   if (!FinishedCallback) {
      // Since we don't have a TaskFinishedCallback, treat a subtask
      // which returned a nonzero exit code as having failed.
      return Result != 0;
   }
   // If we have a TaskFinishedCallback, only have an error if the callback
   // returns StopExecution.
   return TaskFinishedResponse::StopExecution ==
          FinishedCallback(T.getPid(), Result, T.getOutput(), T.getErrors(), ProcInfo,
                           T.getContext());
}

static bool
cleanUpAfterSignal(int Status, const Task &T, TaskProcessInformation ProcInfo,
                   const TaskQueue::TaskSignalledCallback SignalledCallback) {
   // The process exited due to a signal.
   const int Signal = WTERMSIG(Status);
   StringRef ErrorMsg = strsignal(Signal);

   if (!SignalledCallback) {
      // Since we don't have a TaskCrashedCallback, treat a crashing
      // subtask as having failed.
      return true;
   }
   // If we have a TaskCrashedCallback, only return an error if the callback
   // returns StopExecution.
   return TaskFinishedResponse::StopExecution ==
          SignalledCallback(T.getPid(), ErrorMsg, T.getOutput(), T.getErrors(),
                            T.getContext(), Signal, ProcInfo);
}

void TaskMonitor::stopPolling(ArrayRef<int> FinishedFds) {
   // Remove any fds which we've closed from FdsBeingPolled.
   for (int fd : FinishedFds) {
      auto predicate = [&fd](struct pollfd &i) { return i.fd == fd; };
      auto iter =
         std::find_if(FdsBeingPolled.begin(), FdsBeingPolled.end(), predicate);
      assert(iter != FdsBeingPolled.end() &&
             "The finished fd must be in FdsBeingPolled!");
      FdsBeingPolled.erase(iter);
   }
}

bool TaskQueue::execute(TaskBeganCallback BeganCallback,
                        TaskFinishedCallback FinishedCallback,
                        TaskSignalledCallback SignalledCallback) {
   TaskMonitor::Callbacks callbacks{
      BeganCallback, FinishedCallback, SignalledCallback, [&] {
         if (Stats)
            ++Stats->getDriverCounters().NumDriverPipePolls;
      }};

   TaskMonitor TE(QueuedTasks, getNumberOfParallelTasks(), callbacks);
   return TE.executeTasks();
}

} // end namespace sys
} // end namespace polar